Xenstore client library spawns a reader thread the first
authorkaf24@firebug.cl.cam.ac.uk <kaf24@firebug.cl.cam.ac.uk>
Tue, 11 Oct 2005 12:02:59 +0000 (13:02 +0100)
committerkaf24@firebug.cl.cam.ac.uk <kaf24@firebug.cl.cam.ac.uk>
Tue, 11 Oct 2005 12:02:59 +0000 (13:02 +0100)
time a watch is registered. Before this it is fine for
caller threads to read the comms channel directly as no
async messages will be received.

This avoids various user tools needlessly creating three
threads where one will do the job.

Signed-off-by: Keir Fraser <keir@xensource.com>
tools/xenstore/xs.c

index acd71108d9b43d745aa4d7d6d347ca2912715601..e8a870e2a804db7cae84ca8bcc3de9ce15a3296e 100644 (file)
@@ -52,6 +52,7 @@ struct xs_handle {
          * signals waiters.
          */
        pthread_t read_thr;
+       int read_thr_exists;
 
        /*
          * A list of fired watch messages, protected by a mutex. Users can
@@ -77,6 +78,7 @@ struct xs_handle {
        pthread_mutex_t request_mutex;
 };
 
+static int read_message(struct xs_handle *h);
 static void *read_thread(void *arg);
 
 int xs_fileno(struct xs_handle *h)
@@ -131,7 +133,7 @@ static struct xs_handle *get_handle(const char *connect_to)
        int fd = -1, saved_errno;
 
        if (stat(connect_to, &buf) != 0)
-               goto error;
+               return NULL;
 
        if (S_ISSOCK(buf.st_mode))
                fd = get_socket(connect_to);
@@ -139,11 +141,17 @@ static struct xs_handle *get_handle(const char *connect_to)
                fd = get_dev(connect_to);
 
        if (fd == -1)
-               goto error;
+               return NULL;
 
        h = malloc(sizeof(*h));
-       if (h == NULL)
-               goto error;
+       if (h == NULL) {
+               saved_errno = errno;
+               close(fd);
+               errno = saved_errno;
+               return NULL;
+       }
+
+       memset(h, 0, sizeof(*h));
 
        h->fd = fd;
 
@@ -160,19 +168,7 @@ static struct xs_handle *get_handle(const char *connect_to)
 
        pthread_mutex_init(&h->request_mutex, NULL);
 
-       if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
-               goto error;
-
        return h;
-
- error:
-       saved_errno = errno;
-       if (h != NULL)
-               free(h);
-       if (fd != -1)
-               close(fd);
-       errno = saved_errno;
-       return NULL;
 }
 
 struct xs_handle *xs_daemon_open(void)
@@ -198,9 +194,11 @@ void xs_daemon_close(struct xs_handle *h)
        pthread_mutex_lock(&h->reply_mutex);
        pthread_mutex_lock(&h->watch_mutex);
 
-       /* XXX FIXME: May leak an unpublished message buffer. */
-       pthread_cancel(h->read_thr);
-       pthread_join(h->read_thr, NULL);
+       if (h->read_thr_exists) {
+               /* XXX FIXME: May leak an unpublished message buffer. */
+               pthread_cancel(h->read_thr);
+               pthread_join(h->read_thr, NULL);
+       }
 
        list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
                free(msg->body);
@@ -271,6 +269,10 @@ static void *read_reply(
        struct xs_stored_msg *msg;
        char *body;
 
+       /* Read from comms channel ourselves if there is no reader thread. */
+       if (!h->read_thr_exists && (read_message(h) == -1))
+               return NULL;
+
        pthread_mutex_lock(&h->reply_mutex);
        while (list_empty(&h->reply_list))
                pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
@@ -541,6 +543,17 @@ bool xs_watch(struct xs_handle *h, const char *path, const char *token)
 {
        struct iovec iov[2];
 
+       /* We dynamically create a reader thread on demand. */
+       pthread_mutex_lock(&h->request_mutex);
+       if (!h->read_thr_exists) {
+               if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
+                       pthread_mutex_unlock(&h->request_mutex);
+                       return false;
+               }
+               h->read_thr_exists = 1;
+       }
+       pthread_mutex_unlock(&h->request_mutex);
+
        iov[0].iov_base = (void *)path;
        iov[0].iov_len = strlen(path) + 1;
        iov[1].iov_base = (void *)token;
@@ -717,65 +730,72 @@ char *xs_debug_command(struct xs_handle *h, const char *cmd,
                        ARRAY_SIZE(iov), NULL);
 }
 
-static void *read_thread(void *arg)
+static int read_message(struct xs_handle *h)
 {
-       struct xs_handle *h = arg;
        struct xs_stored_msg *msg = NULL;
        char *body = NULL;
+       int saved_errno;
 
-       for (;;) {
-               msg = NULL;
-               body = NULL;
-
-               /* Allocate message structure and read the message header. */
-               msg = malloc(sizeof(*msg));
-               if (msg == NULL)
-                       goto error;
-               if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
-                       goto error;
-
-               /* Allocate and read the message body. */
-               body = msg->body = malloc(msg->hdr.len + 1);
-               if (body == NULL)
-                       goto error;
-               if (!read_all(h->fd, body, msg->hdr.len))
-                       goto error;
-               body[msg->hdr.len] = '\0';
-
-               if (msg->hdr.type == XS_WATCH_EVENT) {
-                       pthread_mutex_lock(&h->watch_mutex);
+       /* Allocate message structure and read the message header. */
+       msg = malloc(sizeof(*msg));
+       if (msg == NULL)
+               goto error;
+       if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
+               goto error;
 
-                       /* Kick users out of their select() loop. */
-                       if (list_empty(&h->watch_list) &&
-                           (h->watch_pipe[1] != -1))
-                               while (write(h->watch_pipe[1], body, 1) != 1)
-                                       continue;
+       /* Allocate and read the message body. */
+       body = msg->body = malloc(msg->hdr.len + 1);
+       if (body == NULL)
+               goto error;
+       if (!read_all(h->fd, body, msg->hdr.len))
+               goto error;
+       body[msg->hdr.len] = '\0';
 
-                       list_add_tail(&msg->list, &h->watch_list);
-                       pthread_cond_signal(&h->watch_condvar);
+       if (msg->hdr.type == XS_WATCH_EVENT) {
+               pthread_mutex_lock(&h->watch_mutex);
 
-                       pthread_mutex_unlock(&h->watch_mutex);
-               } else {
-                       pthread_mutex_lock(&h->reply_mutex);
+               /* Kick users out of their select() loop. */
+               if (list_empty(&h->watch_list) &&
+                   (h->watch_pipe[1] != -1))
+                       while (write(h->watch_pipe[1], body, 1) != 1)
+                               continue;
 
-                       /* There should only ever be one response pending! */
-                       if (!list_empty(&h->reply_list)) {
-                               pthread_mutex_unlock(&h->reply_mutex);
-                               goto error;
-                       }
+               list_add_tail(&msg->list, &h->watch_list);
+               pthread_cond_signal(&h->watch_condvar);
 
-                       list_add_tail(&msg->list, &h->reply_list);
-                       pthread_cond_signal(&h->reply_condvar);
+               pthread_mutex_unlock(&h->watch_mutex);
+       } else {
+               pthread_mutex_lock(&h->reply_mutex);
 
+               /* There should only ever be one response pending! */
+               if (!list_empty(&h->reply_list)) {
                        pthread_mutex_unlock(&h->reply_mutex);
+                       goto error;
                }
+
+               list_add_tail(&msg->list, &h->reply_list);
+               pthread_cond_signal(&h->reply_condvar);
+
+               pthread_mutex_unlock(&h->reply_mutex);
        }
 
+       return 0;
+
  error:
-       if (body != NULL)
-               free(body);
-       if (msg != NULL)
-               free(msg);
+       saved_errno = errno;
+       free(msg);
+       free(body);
+       errno = saved_errno;
+       return -1;
+}
+
+static void *read_thread(void *arg)
+{
+       struct xs_handle *h = arg;
+
+       while (read_message(h) != -1)
+               continue;
+
        return NULL;
 }